Skip to content

API网关与负载均衡:基础实现思路

在大规模部署大模型API服务时,API网关和负载均衡是确保系统高可用、高性能的关键组件。本节将介绍API网关和负载均衡的基本概念、实现方法以及在大模型服务中的应用。

API网关概述

API网关是系统的入口点,负责接收所有客户端请求并将其路由到适当的后端服务。在大模型服务架构中,API网关扮演着以下重要角色:

  1. 请求路由:根据请求路径和参数将请求转发到相应的服务
  2. 流量控制:实现限流、熔断等流量管理功能
  3. 认证与授权:集中处理用户认证和权限验证
  4. 请求转换:根据需要转换请求和响应格式
  5. 监控和日志:记录请求信息用于监控和分析

常用API网关解决方案

  1. Nginx:轻量级高性能的Web服务器和反向代理
  2. Kong:基于Nginx的企业级API网关
  3. APISIX:高性能开源API网关
  4. Traefik:现代化的HTTP反向代理和负载均衡器
  5. 云服务商提供的API网关:如AWS API Gateway、阿里云API网关等

负载均衡基础

负载均衡器在大模型服务架构中负责将请求分发到多个后端服务实例,以实现:

  1. 提高系统吞吐量:同时处理更多请求
  2. 增强系统可用性:单实例故障不影响整体服务
  3. 实现弹性扩展:根据负载动态增减服务实例

负载均衡算法

  1. 轮询(Round Robin):请求依次分配给每个后端服务

    python
    # 简单轮询负载均衡算法
    class RoundRobinBalancer:
        def __init__(self, backends):
            self.backends = backends
            self.current_index = 0
            
        def get_next_backend(self):
            backend = self.backends[self.current_index]
            self.current_index = (self.current_index + 1) % len(self.backends)
            return backend
  2. 加权轮询(Weighted Round Robin):根据服务器权重分配请求

    python
    # 加权轮询负载均衡算法
    class WeightedRoundRobinBalancer:
        def __init__(self, backends_with_weights):
            # backends_with_weights格式: [(backend1, weight1), (backend2, weight2), ...]
            self.backends = []
            for backend, weight in backends_with_weights:
                self.backends.extend([backend] * weight)
            self.current_index = 0
            
        def get_next_backend(self):
            backend = self.backends[self.current_index]
            self.current_index = (self.current_index + 1) % len(self.backends)
            return backend
  3. 最少连接(Least Connection):将请求发送到连接数最少的服务器

    python
    # 最少连接负载均衡算法
    class LeastConnectionBalancer:
        def __init__(self, backends):
            self.backends = {backend: 0 for backend in backends}  # backend: connection_count
            
        def get_next_backend(self):
            # 找到连接数最少的后端
            backend = min(self.backends.items(), key=lambda x: x[1])[0]
            self.backends[backend] += 1
            return backend
            
        def release_backend(self, backend):
            # 请求完成后释放连接
            if backend in self.backends:
                self.backends[backend] = max(0, self.backends[backend] - 1)
  4. IP哈希(IP Hash):基于客户端IP地址哈希来分配请求,保证同一客户端请求总是发送到同一服务器

    python
    # IP哈希负载均衡算法
    class IpHashBalancer:
        def __init__(self, backends):
            self.backends = backends
            
        def get_backend(self, client_ip):
            # 根据客户端IP哈希选择后端
            hash_value = sum(int(octet) for octet in client_ip.split('.'))
            index = hash_value % len(self.backends)
            return self.backends[index]
  5. 随机(Random):随机选择后端服务器

    python
    # 随机负载均衡算法
    import random
    
    class RandomBalancer:
        def __init__(self, backends):
            self.backends = backends
            
        def get_next_backend(self):
            return random.choice(self.backends)

大模型API的网关与负载均衡特性

大模型API服务的特殊性要求网关和负载均衡具备以下特性:

1. 长连接支持

大模型API尤其是流式响应(streaming)场景下,需要保持较长时间的连接。

nginx
# Nginx配置长连接示例
http {
    # 长连接超时设置
    keepalive_timeout 300s;
    # 每个连接最大请求数
    keepalive_requests 1000;
    
    # 对上游服务器的连接池设置
    upstream deepseek_api {
        server backend1.example.com:8000;
        server backend2.example.com:8000;
        keepalive 32;  # 保持的连接数
    }
    
    server {
        listen 80;
        
        location /v1/chat/completions {
            proxy_pass http://deepseek_api;
            proxy_http_version 1.1;  # 使用HTTP/1.1
            proxy_set_header Connection "";  # 启用keepalive
            proxy_read_timeout 300s;  # 读取超时设置
        }
    }
}

2. 请求权重动态调整

根据模型性能和负载动态调整不同后端的请求权重:

python
# 动态权重调整示例
class AdaptiveWeightBalancer:
    def __init__(self, backends):
        # 初始化每个后端的权重、响应时间和错误率
        self.backends = backends
        self.weights = {backend: 10 for backend in backends}  # 初始权重
        self.response_times = {backend: [] for backend in backends}  # 最近响应时间列表
        self.error_rates = {backend: 0 for backend in backends}  # 错误率
        
    def update_metrics(self, backend, response_time, is_error):
        # 更新后端的指标
        self.response_times[backend].append(response_time)
        # 保留最近20个响应时间记录
        if len(self.response_times[backend]) > 20:
            self.response_times[backend].pop(0)
        
        # 更新错误率
        if is_error:
            self.error_rates[backend] = self.error_rates[backend] * 0.9 + 0.1
        else:
            self.error_rates[backend] = self.error_rates[backend] * 0.9
        
        # 重新计算权重
        self._recalculate_weights()
        
    def _recalculate_weights(self):
        for backend in self.backends:
            # 计算平均响应时间
            avg_response_time = sum(self.response_times[backend]) / max(1, len(self.response_times[backend]))
            # 基于响应时间和错误率调整权重
            # 响应时间越低,错误率越低,权重越高
            response_factor = 1000 / max(1, avg_response_time)  # 响应时间因子
            error_factor = 1 - self.error_rates[backend]  # 错误率因子
            
            # 计算新权重
            self.weights[backend] = max(1, int(10 * response_factor * error_factor))
    
    def get_next_backend(self):
        # 创建加权列表
        weighted_backends = []
        for backend, weight in self.weights.items():
            weighted_backends.extend([backend] * weight)
        
        # 随机选择
        return random.choice(weighted_backends) if weighted_backends else random.choice(self.backends)

3. 资源感知路由

根据服务器资源使用情况动态路由请求:

python
# 资源感知路由示例
class ResourceAwareRouter:
    def __init__(self, backends):
        self.backends = backends
        self.resource_metrics = {backend: {'cpu': 0, 'gpu': 0, 'memory': 0} for backend in backends}
        
    def update_metrics(self, backend, metrics):
        # 更新后端服务器资源使用指标
        self.resource_metrics[backend] = metrics
        
    def get_best_backend(self):
        # 根据资源使用情况选择最佳后端
        # 这里使用GPU使用率作为关键指标
        best_backend = None
        lowest_gpu_usage = float('inf')
        
        for backend, metrics in self.resource_metrics.items():
            if metrics['gpu'] < lowest_gpu_usage:
                lowest_gpu_usage = metrics['gpu']
                best_backend = backend
                
        return best_backend or random.choice(self.backends)

实践:构建大模型API网关

下面通过几个实际例子,展示如何构建大模型API网关:

1. 使用Nginx作为简单API网关

nginx
# Nginx API网关配置示例
http {
    # 定义不同模型的上游服务器组
    upstream deepseek_chat {
        server deepseek1:8000 weight=3;
        server deepseek2:8000 weight=2;
    }
    
    upstream deepseek_lite {
        server deepseek_lite1:8000;
        server deepseek_lite2:8000;
    }
    
    # 限流配置
    limit_req_zone $binary_remote_addr zone=api_limit:10m rate=10r/s;
    
    server {
        listen 80;
        
        # API密钥验证
        set $api_key "";
        if ($http_authorization ~ "Bearer (.+)") {
            set $api_key $1;
        }
        
        # 拒绝无效密钥
        if ($api_key = "") {
            return 401 '{"error":"Missing API key"}';
        }
        
        # 模型路由 - Chat Completions API
        location /v1/chat/completions {
            # 应用限流
            limit_req zone=api_limit burst=20 nodelay;
            
            # 根据请求体内容选择上游服务器
            set $upstream "deepseek_chat";
            if ($request_body ~ "model[\"']?\s*:\s*[\"']?deepseek-lite[\"']?") {
                set $upstream "deepseek_lite";
            }
            
            proxy_pass http://$upstream;
            proxy_http_version 1.1;
            proxy_set_header Connection "";
            proxy_set_header X-Real-IP $remote_addr;
            
            # 请求超时设置
            proxy_connect_timeout 10s;
            proxy_send_timeout 60s;
            proxy_read_timeout 300s;
            
            # 缓冲设置
            proxy_buffering off;  # 关闭缓冲以支持流式响应
        }
        
        # 健康检查端点
        location /health {
            return 200 '{"status":"healthy"}';
        }
    }
}

2. 使用Kong作为企业级API网关

Kong 是一款基于 Nginx 的高性能、可扩展的企业级 API 网关,广泛应用于微服务、云原生和高并发场景。它支持丰富的插件体系,能够实现认证、限流、监控、日志等多种 API 管理功能。

2.1 Kong简介与优势

  • 高性能:基于 Nginx,转发效率高,支持大规模并发。
  • 插件丰富:支持认证、限流、安全、监控等多种插件,灵活扩展。
  • 云原生友好:支持 Docker、Kubernetes 等多种部署方式。
  • 自动化与可视化:提供 Admin API 和 Kong Manager 图形界面,便于自动化和可视化管理。

2.2 Kong的部署方式

Kong 支持多种部署方式,推荐使用 Docker 或 Kubernetes 进行快速部署。

  • Docker 部署示例
    bash
    docker run -d --name kong-database \
      -p 5432:5432 \
      -e POSTGRES_USER=kong \
      -e POSTGRES_DB=kong \
      -e POSTGRES_PASSWORD=kong \
      postgres:13
    
    docker run -d --name kong \
      --link kong-database:kong-database \
      -e KONG_DATABASE=postgres \
      -e KONG_PG_HOST=kong-database \
      -e KONG_PG_PASSWORD=kong \
      -e KONG_PROXY_ACCESS_LOG=/dev/stdout \
      -e KONG_ADMIN_ACCESS_LOG=/dev/stdout \
      -e KONG_PROXY_ERROR_LOG=/dev/stderr \
      -e KONG_ADMIN_ERROR_LOG=/dev/stderr \
      -e KONG_ADMIN_LISTEN=0.0.0.0:8001, 0.0.0.0:8444 ssl \
      -p 8000:8000 -p 8443:8443 -p 8001:8001 -p 8444:8444 \
      kong:latest
  • Kubernetes 部署:可使用 Kong Ingress Controller 进行集成。

2.3 Kong的核心配置流程

Kong 的 API 管理主要包括服务(Service)、路由(Route)、插件(Plugin)、消费者(Consumer)等核心对象。

  1. 注册后端服务(Service)

代表实际的后端 API 服务。

bash
curl -i -X POST http://localhost:8001/services \
  --data "name=deepseek-service" \
  --data "url=http://deepseek-backend"
  1. 创建路由(Route)

定义请求路径与后端服务的映射关系。

bash
curl -i -X POST http://localhost:8001/services/deepseek-service/routes \
  --data "paths[]=/v1/chat/completions" \
  --data "name=chat-completions-route"
  1. 启用插件(Plugin)

Kong 支持为服务或路由启用多种插件。

  • API密钥认证
    bash
    curl -i -X POST http://localhost:8001/services/deepseek-service/plugins \
      --data "name=key-auth"
  • 请求头转换
    bash
    curl -i -X POST http://localhost:8001/services/deepseek-service/plugins \
      --data "name=request-transformer" \
      --data "config.add.headers[]=X-API-Version:v1"
  • 限流
    bash
    curl -i -X POST http://localhost:8001/services/deepseek-service/plugins \
      --data "name=rate-limiting" \
      --data "config.minute=60" \
      --data "config.policy=local"
  • 健康检查
    bash
    curl -i -X POST http://localhost:8001/services/deepseek-service/plugins \
      --data "name=health-checks" \
      --data "config.active.healthy.interval=10" \
      --data "config.active.unhealthy.interval=5"
  1. 消费者与密钥管理(Consumer & Key)

为 API 用户分配密钥,实现访问控制。

bash
# 创建消费者
curl -i -X POST http://localhost:8001/consumers --data "username=deepseek-user"
# 为消费者生成密钥
curl -i -X POST http://localhost:8001/consumers/deepseek-user/key-auth

2.4 Kong的典型应用场景

  • 统一认证与授权:集中管理 API 访问权限。
  • 流量控制与限流:防止恶意刷接口,保护后端服务。
  • API 监控与日志:集成 Prometheus、ELK 等监控与日志系统。
  • 动态路由与灰度发布:灵活配置流量分发策略。

2.5 可视化与自动化管理

  • Kong Manager:官方可视化管理界面,支持服务、路由、插件等对象的可视化配置。
  • Kong Admin API:支持所有操作的自动化脚本管理,便于 CI/CD 集成。

2.6 实践案例:完整配置流程

以下为典型的 Kong API 网关配置流程脚本,适合快速上手:

bash
# 1. 注册后端服务
curl -i -X POST http://localhost:8001/services \
  --data "name=deepseek-service" \
  --data "url=http://deepseek-backend"

# 2. 创建路由
curl -i -X POST http://localhost:8001/services/deepseek-service/routes \
  --data "paths[]=/v1/chat/completions" \
  --data "name=chat-completions-route"

# 3. 启用密钥认证插件
curl -i -X POST http://localhost:8001/services/deepseek-service/plugins \
  --data "name=key-auth"

# 4. 启用请求头转换插件
curl -i -X POST http://localhost:8001/services/deepseek-service/plugins \
  --data "name=request-transformer" \
  --data "config.add.headers[]=X-API-Version:v1"

# 5. 启用限流插件
curl -i -X POST http://localhost:8001/services/deepseek-service/plugins \
  --data "name=rate-limiting" \
  --data "config.minute=60" \
  --data "config.policy=local"

# 6. 启用健康检查插件
curl -i -X POST http://localhost:8001/services/deepseek-service/plugins \
  --data "name=health-checks" \
  --data "config.active.healthy.interval=10" \
  --data "config.active.unhealthy.interval=5"

# 7. 创建消费者并分配密钥
curl -i -X POST http://localhost:8001/consumers --data "username=deepseek-user"
curl -i -X POST http://localhost:8001/consumers/deepseek-user/key-auth

通过上述流程,Kong 可实现企业级 API 网关的统一认证、限流、监控、健康检查等功能,支持高可用、高扩展性的大模型 API 服务架构。

3. 自定义Python API网关

使用Python构建一个简单的API网关:

python
# 使用FastAPI构建简单API网关
from fastapi import FastAPI, Request, Response, HTTPException, Depends
import httpx
import asyncio
import random
import time
import json

app = FastAPI()

# 后端服务配置
BACKENDS = {
    "deepseek-chat": ["http://backend1:8000", "http://backend2:8000"],
    "deepseek-lite": ["http://backend3:8000", "http://backend4:8000"]
}

# 简单的API密钥存储
API_KEYS = {
    "key1": {"user_id": "user1", "tier": "premium"},
    "key2": {"user_id": "user2", "tier": "basic"}
}

# 限流器
class RateLimiter:
    def __init__(self, max_requests, window_seconds):
        self.max_requests = max_requests
        self.window_seconds = window_seconds
        self.user_requests = {}  # user_id -> [(timestamp, request_tokens), ...]
        
    async def check_limit(self, user_id, request_tokens=1):
        current_time = time.time()
        
        # 初始化用户请求记录
        if user_id not in self.user_requests:
            self.user_requests[user_id] = []
            
        # 清理过期请求
        self.user_requests[user_id] = [
            (ts, tokens) for ts, tokens in self.user_requests[user_id]
            if current_time - ts < self.window_seconds
        ]
        
        # 计算当前窗口的请求总数
        total_requests = len(self.user_requests[user_id])
        
        # 检查是否超出限制
        if total_requests >= self.max_requests:
            return False
            
        # 记录新请求
        self.user_requests[user_id].append((current_time, request_tokens))
        return True

# 创建限流器实例
minute_limiter = RateLimiter(max_requests=60, window_seconds=60)

# 负载均衡器
class LoadBalancer:
    def __init__(self, backends):
        self.backends = backends
        self.current_index = 0
        
    def get_next_backend(self):
        if not self.backends:
            raise HTTPException(status_code=503, detail="No backend servers available")
            
        backend = self.backends[self.current_index]
        self.current_index = (self.current_index + 1) % len(self.backends)
        return backend

# 创建负载均衡器实例
balancers = {
    model: LoadBalancer(backends) for model, backends in BACKENDS.items()
}

# 验证API密钥
async def verify_api_key(request: Request):
    auth_header = request.headers.get("Authorization")
    if not auth_header or not auth_header.startswith("Bearer "):
        raise HTTPException(status_code=401, detail="Missing or invalid API key")
        
    api_key = auth_header.replace("Bearer ", "")
    if api_key not in API_KEYS:
        raise HTTPException(status_code=401, detail="Invalid API key")
        
    return API_KEYS[api_key]

# 流式响应处理
async def stream_response(response):
    async for chunk in response.aiter_bytes():
        yield chunk

# API路由
@app.post("/v1/chat/completions")
async def chat_completions(request: Request, user_info = Depends(verify_api_key)):
    # 解析请求体
    body = await request.json()
    model_name = body.get("model", "deepseek-chat")
    
    # 检查模型是否存在
    if model_name not in BACKENDS:
        raise HTTPException(status_code=400, detail=f"Unsupported model: {model_name}")
    
    # 检查用户权限
    user_tier = user_info["tier"]
    if user_tier == "basic" and model_name != "deepseek-lite":
        raise HTTPException(status_code=403, detail="Premium models not available for basic tier")
    
    # 限流检查
    user_id = user_info["user_id"]
    if not await minute_limiter.check_limit(user_id):
        raise HTTPException(status_code=429, detail="Rate limit exceeded")
    
    # 负载均衡选择后端
    backend_url = balancers[model_name].get_next_backend()
    
    # 转发请求到后端
    async with httpx.AsyncClient() as client:
        try:
            # 设置较长的超时时间
            backend_response = await client.post(
                f"{backend_url}/v1/chat/completions",
                json=body,
                headers={"Content-Type": "application/json"},
                timeout=300.0
            )
            
            # 检查是否为流式响应
            if body.get("stream", False):
                # 返回流式响应
                return Response(
                    content=stream_response(backend_response),
                    media_type="text/event-stream"
                )
            else:
                # 返回普通响应
                return Response(
                    content=backend_response.content,
                    status_code=backend_response.status_code,
                    media_type=backend_response.headers.get("Content-Type", "application/json")
                )
                
        except httpx.TimeoutException:
            raise HTTPException(status_code=504, detail="Backend server timeout")
        except Exception as e:
            raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")

# 健康检查端点
@app.get("/health")
async def health_check():
    return {"status": "healthy"}

# 启动服务器
if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8080)

高级特性:灰度发布与A/B测试

API网关还可以支持灰度发布和A/B测试等高级特性:

1. 灰度发布配置

逐步将流量迁移到新版本服务:

nginx
# Nginx灰度发布配置
http {
    # 定义版本分组
    upstream deepseek_v1 {
        server deepseek_v1_1:8000;
        server deepseek_v1_2:8000;
    }
    
    upstream deepseek_v2 {
        server deepseek_v2_1:8000;
        server deepseek_v2_2:8000;
    }
    
    # 灰度发布配置
    split_clients "${remote_addr}${request_uri}" $deepseek_version {
        20%     deepseek_v2;  # 20%流量到新版本
        *       deepseek_v1;  # 其余流量到旧版本
    }
    
    server {
        listen 80;
        
        location /v1/chat/completions {
            proxy_pass http://$deepseek_version;
            # 其他代理配置...
        }
    }
}

2. A/B测试配置

根据请求参数或头信息将流量分发到不同实现:

python
# FastAPI A/B测试示例
@app.post("/v1/chat/completions")
async def chat_completions(request: Request, user_info = Depends(verify_api_key)):
    # 解析请求体
    body = await request.json()
    
    # 读取测试组标记
    test_group = request.headers.get("X-Test-Group", "")
    
    # 根据测试组选择后端
    backend_url = None
    if test_group == "experiment":
        # 实验组使用新模型
        backend_url = "http://new-model-backend:8000"
    elif test_group == "control":
        # 对照组使用原有模型
        backend_url = "http://original-model-backend:8000"
    else:
        # 随机分配测试组
        if random.random() < 0.5:
            backend_url = "http://new-model-backend:8000"
            test_group = "experiment"
        else:
            backend_url = "http://original-model-backend:8000"
            test_group = "control"
    
    # 转发请求到后端
    async with httpx.AsyncClient() as client:
        backend_response = await client.post(
            f"{backend_url}/v1/chat/completions",
            json=body,
            headers={
                "Content-Type": "application/json",
                "X-Test-Group": test_group  # 传递测试组信息给后端
            }
        )
        
        # 记录测试数据
        log_test_data(user_info["user_id"], test_group, body, backend_response.json())
        
        # 返回响应
        return Response(
            content=backend_response.content,
            status_code=backend_response.status_code,
            media_type=backend_response.headers.get("Content-Type", "application/json")
        )

容器环境中的服务发现与动态配置

在容器化环境(如Kubernetes)中,API网关和负载均衡可以与服务发现机制结合:

1. Kubernetes Ingress控制器

使用Kubernetes Ingress作为API网关:

yaml
# Kubernetes Ingress配置
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
  name: deepseek-api-ingress
  annotations:
    nginx.ingress.kubernetes.io/rewrite-target: /$1
    nginx.ingress.kubernetes.io/ssl-redirect: "true"
    nginx.ingress.kubernetes.io/proxy-read-timeout: "300"
    nginx.ingress.kubernetes.io/proxy-send-timeout: "300"
spec:
  rules:
  - host: api.example.com
    http:
      paths:
      - path: /v1/chat/completions
        pathType: Prefix
        backend:
          service:
            name: deepseek-api-service
            port:
              number: 80

2. 服务网格(Service Mesh)

在复杂微服务架构中,可以使用Istio等服务网格来管理API流量:

yaml
# Istio VirtualService配置
apiVersion: networking.istio.io/v1alpha3
kind: VirtualService
metadata:
  name: deepseek-api
spec:
  hosts:
  - api.example.com
  gateways:
  - api-gateway
  http:
  - match:
    - uri:
        prefix: /v1/chat/completions
    route:
    - destination:
        host: deepseek-api.default.svc.cluster.local
        port:
          number: 80
      weight: 90
    - destination:
        host: deepseek-api-new.default.svc.cluster.local
        port:
          number: 80
      weight: 10  # 10%流量到新版本
    timeout: 300s

Python和Node.js开源API网关方案

除了传统的API网关解决方案外,Python和Node.js生态系统中也有一些专门的API网关解决方案,特别适合于大模型API服务集成。

Python侧开源方案

  1. FastAPI Gateway

    • 基于FastAPI构建的轻量级API网关
    • 支持路由、认证、限流等功能
    • 适合中小规模应用
  2. KongPy

    • Python版的Kong API网关客户端
    • 可以通过Python代码管理Kong配置
  3. Falcon Gateway

    • 基于Falcon框架的高性能API网关
    • 专注于低延迟高吞吐量

Node.js侧开源方案

  1. Express Gateway
    • 基于Express.js的API网关
    • 功能全面,生态成熟
    • 适合JavaScript开发团队

综合性高级API网关实现

下面是一个使用FastAPI构建的更加综合性的API网关实现,包含了更多高级功能:

python
from fastapi import FastAPI, Request, Response, HTTPException, Depends, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
import httpx
import asyncio
import time
import random
import json
import logging
from typing import Dict, List, Optional, Any
import prometheus_client
from prometheus_client import Counter, Histogram, Gauge
import uuid

# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("api-gateway")

# 创建FastAPI应用
app = FastAPI(title="DeepSeek API Gateway")

# 添加CORS中间件
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# Prometheus指标
REQUEST_COUNT = Counter('api_requests_total', 'Total API requests', ['endpoint', 'status'])
REQUEST_LATENCY = Histogram('api_request_latency_seconds', 'API request latency', ['endpoint'])
BACKEND_LOAD = Gauge('backend_load', 'Current backend load', ['backend'])
ACTIVE_CONNECTIONS = Gauge('active_connections', 'Current active connections')

# 后端服务配置
BACKEND_SERVICES = {
    "deepseek-chat": {
        "endpoints": [
            {"url": "http://backend1:8000", "weight": 3, "health": True},
            {"url": "http://backend2:8000", "weight": 2, "health": True}
        ],
        "timeout": 300.0,
        "retry_count": 1
    },
    "deepseek-lite": {
        "endpoints": [
            {"url": "http://backend3:8000", "weight": 1, "health": True},
            {"url": "http://backend4:8000", "weight": 1, "health": True}
        ],
        "timeout": 120.0,
        "retry_count": 2
    }
}

# API密钥配置
API_KEYS = {
    "sk-1234567890abcdef": {"user_id": "user1", "tier": "premium", "rate_limit": 100},
    "sk-0987654321fedcba": {"user_id": "user2", "tier": "basic", "rate_limit": 20}
}

# 请求会话存储
active_sessions: Dict[str, Dict[str, Any]] = {}

# 自适应负载均衡器
class AdaptiveLoadBalancer:
    def __init__(self, service_config):
        self.endpoints = service_config["endpoints"]
        self.response_times = {endpoint["url"]: [] for endpoint in self.endpoints}
        self.error_counts = {endpoint["url"]: 0 for endpoint in self.endpoints}
        self.last_updated = {endpoint["url"]: time.time() for endpoint in self.endpoints}
    
    def get_backend(self):
        """选择最佳后端服务"""
        available_backends = [ep for ep in self.endpoints if ep["health"]]
        if not available_backends:
            raise HTTPException(status_code=503, detail="No healthy backend available")
        
        # 计算每个后端的分数 (低分更好)
        scores = []
        for backend in available_backends:
            url = backend["url"]
            
            # 计算平均响应时间
            if self.response_times[url]:
                avg_response_time = sum(self.response_times[url]) / len(self.response_times[url])
            else:
                avg_response_time = 0.1  # 默认值
            
            # 计算错误率 (指数退避)
            error_penalty = self.error_counts[url] ** 2
            
            # 计算总分 (响应时间 + 错误惩罚 - 权重加成)
            score = avg_response_time + error_penalty - backend["weight"]
            scores.append((backend, score))
        
        # 按分数排序并选择最佳后端 (80%选择最佳, 20%随机选择其他)
        scores.sort(key=lambda x: x[1])
        if random.random() < 0.8 or len(scores) == 1:
            return scores[0][0]
        else:
            return random.choice(scores[1:] if len(scores) > 1 else scores)[0]
    
    def update_metrics(self, backend_url, response_time, is_error):
        """更新后端指标"""
        # 更新响应时间
        self.response_times[backend_url].append(response_time)
        if len(self.response_times[backend_url]) > 10:
            self.response_times[backend_url].pop(0)
        
        # 更新错误计数
        if is_error:
            self.error_counts[backend_url] += 1
        else:
            self.error_counts[backend_url] = max(0, self.error_counts[backend_url] - 0.1)
            
        # 更新最后访问时间
        self.last_updated[backend_url] = time.time()
        
        # 更新Prometheus指标
        BACKEND_LOAD.labels(backend=backend_url).set(len(self.response_times[backend_url]))

# 创建每个服务的负载均衡器
load_balancers = {
    service: AdaptiveLoadBalancer(config) 
    for service, config in BACKEND_SERVICES.items()
}

# 速率限制器
class RateLimiter:
    def __init__(self):
        self.request_counts = {}  # user_id -> [(timestamp, count), ...]
    
    def check_limit(self, user_id, rate_limit, window_seconds=60):
        """检查用户是否超过速率限制"""
        current_time = time.time()
        
        # 初始化用户请求记录
        if user_id not in self.request_counts:
            self.request_counts[user_id] = []
        
        # 清理过期请求记录
        self.request_counts[user_id] = [
            (ts, count) for ts, count in self.request_counts[user_id]
            if current_time - ts < window_seconds
        ]
        
        # 计算当前窗口的请求总数
        total_requests = sum(count for _, count in self.request_counts[user_id])
        
        # 检查是否超过限制
        if total_requests >= rate_limit:
            return False
        
        # 记录新请求
        self.request_counts[user_id].append((current_time, 1))
        return True

# 创建速率限制器实例
rate_limiter = RateLimiter()

# 验证API密钥
async def verify_api_key(request: Request):
    auth_header = request.headers.get("Authorization")
    if not auth_header or not auth_header.startswith("Bearer "):
        raise HTTPException(status_code=401, detail="Missing or invalid API key")
    
    api_key = auth_header.replace("Bearer ", "")
    if api_key not in API_KEYS:
        raise HTTPException(status_code=401, detail="Invalid API key")
    
    return API_KEYS[api_key]

# 后台健康检查任务
async def health_check_task():
    """定期检查后端服务健康状态"""
    http_client = httpx.AsyncClient(timeout=5.0)
    while True:
        for service_name, service_config in BACKEND_SERVICES.items():
            for endpoint in service_config["endpoints"]:
                try:
                    response = await http_client.get(f"{endpoint['url']}/health")
                    endpoint["health"] = response.status_code == 200
                except Exception:
                    endpoint["health"] = False
                
                logger.info(f"Health check: {endpoint['url']} is {'healthy' if endpoint['health'] else 'unhealthy'}")
        
        await asyncio.sleep(30)  # 每30秒检查一次

# 启动后台任务
@app.on_event("startup")
async def startup_event():
    asyncio.create_task(health_check_task())

# 暴露Prometheus指标
@app.get("/metrics")
async def metrics():
    return Response(content=prometheus_client.generate_latest(), media_type="text/plain")

# 健康检查端点
@app.get("/health")
async def health():
    return {"status": "healthy"}

# 获取会话状态
@app.get("/sessions/{session_id}")
async def get_session(session_id: str):
    if session_id not in active_sessions:
        raise HTTPException(status_code=404, detail="Session not found")
    return active_sessions[session_id]

# 核心API路由 - 聊天补全API
@app.post("/v1/chat/completions")
async def chat_completions(request: Request, background_tasks: BackgroundTasks, user_info = Depends(verify_api_key)):
    # 创建请求ID和跟踪会话
    request_id = str(uuid.uuid4())
    session_id = request.headers.get("X-Session-ID", request_id)
    
    # 记录会话开始
    start_time = time.time()
    ACTIVE_CONNECTIONS.inc()
    
    # 解析请求体
    try:
        body = await request.json()
    except json.JSONDecodeError:
        ACTIVE_CONNECTIONS.dec()
        raise HTTPException(status_code=400, detail="Invalid JSON body")
    
    # 获取模型名称
    model_name = body.get("model", "deepseek-chat")
    if model_name not in BACKEND_SERVICES:
        ACTIVE_CONNECTIONS.dec()
        raise HTTPException(status_code=400, detail=f"Unsupported model: {model_name}")
    
    # 检查用户权限
    user_tier = user_info["tier"]
    if user_tier == "basic" and model_name == "deepseek-chat":
        ACTIVE_CONNECTIONS.dec()
        raise HTTPException(status_code=403, detail="Premium models not available for basic tier")
    
    # 检查速率限制
    user_id = user_info["user_id"]
    rate_limit = user_info["rate_limit"]
    if not rate_limiter.check_limit(user_id, rate_limit):
        ACTIVE_CONNECTIONS.dec()
        raise HTTPException(status_code=429, detail="Rate limit exceeded")
    
    # 选择后端服务
    service_config = BACKEND_SERVICES[model_name]
    backend = load_balancers[model_name].get_backend()
    backend_url = backend["url"]
    
    # 创建会话记录
    session = {
        "id": session_id,
        "user_id": user_id,
        "model": model_name,
        "backend": backend_url,
        "start_time": start_time,
        "status": "processing"
    }
    active_sessions[session_id] = session
    
    # 添加自定义头信息
    headers = {
        "Content-Type": "application/json",
        "X-Request-ID": request_id,
        "X-Session-ID": session_id,
        "X-User-ID": user_id,
        "X-User-Tier": user_tier
    }
    
    # 请求处理函数
    async def process_request(stream=False):
        nonlocal backend_url
        retry_count = service_config["retry_count"]
        
        while retry_count >= 0:
            try:
                async with httpx.AsyncClient(timeout=service_config["timeout"]) as client:
                    request_start = time.time()
                    
                    # 发送请求到后端
                    response = await client.post(
                        f"{backend_url}/v1/chat/completions",
                        json=body,
                        headers=headers
                    )
                    
                    request_duration = time.time() - request_start
                    is_error = response.status_code >= 400
                    
                    # 更新负载均衡指标
                    load_balancers[model_name].update_metrics(
                        backend_url, request_duration, is_error
                    )
                    
                    # 记录指标
                    REQUEST_LATENCY.labels(endpoint="/v1/chat/completions").observe(request_duration)
                    REQUEST_COUNT.labels(endpoint="/v1/chat/completions", status=str(response.status_code)).inc()
                    
                    # 更新会话状态
                    session["end_time"] = time.time()
                    session["duration"] = session["end_time"] - session["start_time"]
                    session["status"] = "completed" if response.status_code < 400 else "failed"
                    session["status_code"] = response.status_code
                    
                    return response
                    
            except Exception as e:
                logger.error(f"Backend request failed: {str(e)}")
                retry_count -= 1
                
                if retry_count >= 0:
                    # 尝试选择不同的后端
                    new_backend = load_balancers[model_name].get_backend()
                    if new_backend["url"] != backend_url:
                        backend_url = new_backend["url"]
                        logger.info(f"Retrying with different backend: {backend_url}")
                    else:
                        logger.info(f"Retrying with same backend: {backend_url}")
                else:
                    # 更新会话状态
                    session["end_time"] = time.time()
                    session["duration"] = session["end_time"] - session["start_time"]
                    session["status"] = "failed"
                    session["error"] = str(e)
                    
                    ACTIVE_CONNECTIONS.dec()
                    raise HTTPException(status_code=503, detail="Service unavailable")
    
    # 处理流式响应
    if body.get("stream", False):
        async def stream_response():
            try:
                response = await process_request(stream=True)
                
                # 转发流式响应
                async for chunk in response.aiter_bytes():
                    yield chunk
                
                ACTIVE_CONNECTIONS.dec()
            except Exception as e:
                logger.error(f"Stream error: {str(e)}")
                ACTIVE_CONNECTIONS.dec()
                yield f"data: {{\"error\": \"{str(e)}\"}}\n\n".encode()
        
        # 返回流式响应
        return Response(
            content=stream_response(),
            media_type="text/event-stream"
        )
    else:
        # 处理普通响应
        try:
            response = await process_request()
            background_tasks.add_task(lambda: ACTIVE_CONNECTIONS.dec())
            
            # 返回普通响应
            return Response(
                content=response.content,
                status_code=response.status_code,
                media_type=response.headers.get("Content-Type", "application/json")
            )
        except Exception as e:
            ACTIVE_CONNECTIONS.dec()
            raise e

# 内部统计信息API
@app.get("/internal/stats")
async def internal_stats():
    return {
        "active_connections": prometheus_client.REGISTRY.get_sample_value('active_connections'),
        "backends": {
            service: [
                {
                    "url": endpoint["url"],
                    "health": endpoint["health"],
                    "weight": endpoint["weight"],
                    "load": prometheus_client.REGISTRY.get_sample_value('backend_load', {"backend": endpoint["url"]}) or 0
                }
                for endpoint in config["endpoints"]
            ]
            for service, config in BACKEND_SERVICES.items()
        },
        "sessions": len(active_sessions)
    }

# 启动服务器
if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8080)

开源方案使用指南

以下是几个流行的Python API网关方案的使用指南:

1. ApiGear - Python开源API网关

ApiGear是一个轻量级的Python API网关,专注于API管理和文档生成。

安装和使用

bash
# 安装ApiGear
pip install apigear

# 创建配置文件 apigear.yml
cat > apigear.yml << EOF
services:
  deepseek-api:
    endpoints:
      - url: http://backend1:8000
        weight: 3
      - url: http://backend2:8000
        weight: 2
    routes:
      - path: /v1/chat/completions
        methods: [POST]
EOF

# 启动API网关
apigear start

2. FastAPI Gateway

可以使用FastAPI官方推荐的方式构建API网关。

安装和使用

bash
# 安装依赖
pip install fastapi uvicorn httpx

# 创建基本API网关
cat > gateway.py << EOF
from fastapi import FastAPI, Request
import httpx

app = FastAPI()

BACKEND_SERVICES = {
    "chat": "http://chat-backend:8000",
    "embeddings": "http://embeddings-backend:8000"
}

@app.api_route("/{path:path}", methods=["GET", "POST", "PUT", "DELETE"])
async def gateway(request: Request, path: str):
    # 路由逻辑
    if path.startswith("v1/chat"):
        backend = BACKEND_SERVICES["chat"]
    elif path.startswith("v1/embeddings"):
        backend = BACKEND_SERVICES["embeddings"]
    else:
        return {"error": "Service not found"}
    
    # 转发请求
    client = httpx.AsyncClient()
    url = f"{backend}/{path}"
    
    # 构建请求
    headers = {k: v for k, v in request.headers.items() if k.lower() != "host"}
    
    # 根据请求方法处理
    method = request.method
    if method == "GET":
        response = await client.get(url, headers=headers, params=request.query_params)
    elif method == "POST":
        body = await request.body()
        response = await client.post(url, headers=headers, content=body)
    elif method == "PUT":
        body = await request.body()
        response = await client.put(url, headers=headers, content=body)
    elif method == "DELETE":
        response = await client.delete(url, headers=headers)
    
    # 返回响应
    return Response(
        content=response.content,
        status_code=response.status_code,
        headers=dict(response.headers)
    )

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8080)
EOF

# 启动网关
uvicorn gateway:app --host 0.0.0.0 --port 8080

3. Kong Gateway (Python客户端)

Kong是最受欢迎的API网关之一,可以通过Python客户端进行管理。

安装和使用

bash
# 安装Kong客户端
pip install kong

# 使用Python管理Kong
cat > kong_setup.py << EOF
from kong.client import Kong

# 连接到Kong Admin API
kong = Kong(api_url="http://kong:8001")

# 创建服务
service = kong.services.create(
    name="deepseek-api",
    url="http://deepseek-backend:8000"
)

# 添加路由
route = kong.routes.create(
    service_id=service.id,
    paths=["/v1/chat/completions"],
    methods=["POST"]
)

# 添加认证插件
plugin = kong.plugins.create(
    service_id=service.id,
    name="key-auth"
)

# 创建消费者
consumer = kong.consumers.create(username="deepseek-client")

# 为消费者生成API密钥
key_auth = kong.key_auths.create(consumer_id=consumer.id)
api_key = key_auth.key

print(f"API Key: {api_key}")
EOF

# 运行Kong设置脚本
python kong_setup.py

API网关的最新发展趋势

随着大模型API服务的普及,API网关也在不断演进,以下是一些最新的发展趋势:

  1. 无服务器API网关:专为云原生和无服务器架构设计的轻量级API网关
  2. WebSocket支持增强:更好地支持长连接和实时通信,对流式LLM输出至关重要
  3. 边缘计算集成:将API网关部署到离用户更近的边缘节点,减少延迟
  4. AI驱动的自适应路由:使用机器学习优化请求路由和负载均衡决策
  5. 更精细的流量控制:基于请求内容、用户行为和系统负载进行智能流量管理

通过采用这些先进技术,开发者可以构建更加高效、可靠的大模型API服务架构,更好地满足用户需求。